跳到主要内容

Redis 实现令牌桶限流算法

令牌桶算法原理

令牌桶算法是一种常用的限流算法,其核心思想是以固定的速率向一个有限容量的桶中添加令牌,当请求到达时需要消耗令牌才能被处理。如果桶中没有足够的令牌,请求将被拒绝或延迟处理。

算法优势

相比其他限流算法,令牌桶算法具有以下优势:

  1. 允许突发流量: 桶中积累的令牌可以处理短时间的突发请求
  2. 平滑限流: 通过令牌生成速率控制平均处理速率
  3. 灵活配置: 可以根据业务需求调整令牌生成速率和桶容量

Redis 实现原理

在分布式系统中,我们需要使用 Redis 来实现分布式令牌桶,确保多个服务实例之间的限流策略一致。

Redis 数据结构

我们使用 Redis 的 Hash 结构存储令牌桶状态:

type TokenBucket struct {
Tokens float64 `json:"tokens"` // 当前令牌数
LastTime int64 `json:"last_time"` // 上次更新时间戳
}

Redis Key 设计:

  • Key: rate_limit:token_bucket:{resource_id}
  • Hash Fields:
    • tokens: 当前令牌数量
    • last_time: 最后更新时间戳

Lua 脚本实现

为了保证操作的原子性,我们使用 Lua 脚本在 Redis 中实现令牌桶逻辑:

-- 令牌桶限流 Lua 脚本
-- KEYS[1]: 令牌桶的 Redis key
-- ARGV[1]: 桶容量 (capacity)
-- ARGV[2]: 令牌生成速率 (rate per second)
-- ARGV[3]: 请求的令牌数量 (requested tokens)
-- ARGV[4]: 当前时间戳 (current timestamp)

local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local requested = tonumber(ARGV[3])
local now = tonumber(ARGV[4])

-- 获取当前令牌桶状态
local bucket = redis.call('HMGET', key, 'tokens', 'last_time')
local tokens = tonumber(bucket[1]) or capacity
local last_time = tonumber(bucket[2]) or now

-- 计算需要添加的令牌数
local elapsed = now - last_time
local new_tokens = math.min(capacity, tokens + elapsed * rate)

-- 检查是否有足够的令牌
if new_tokens >= requested then
-- 消耗令牌并更新状态
new_tokens = new_tokens - requested
redis.call('HMSET', key, 'tokens', new_tokens, 'last_time', now)
redis.call('EXPIRE', key, 3600) -- 设置过期时间
return 1 -- 允许请求
else
-- 更新时间戳但不消耗令牌
redis.call('HMSET', key, 'tokens', new_tokens, 'last_time', now)
redis.call('EXPIRE', key, 3600)
return 0 -- 拒绝请求
end

Go 客户端实现

package ratelimit

import (
"context"
"fmt"
"time"

"github.com/go-redis/redis/v8"
)

type TokenBucketLimiter struct {
client redis.Cmdable
script *redis.Script
capacity float64 // 桶容量
rate float64 // 令牌生成速率 (tokens/second)
}

// Lua 脚本内容
const tokenBucketScript = `
-- (上面的 Lua 脚本内容)
`

func NewTokenBucketLimiter(client redis.Cmdable, capacity, rate float64) *TokenBucketLimiter {
return &TokenBucketLimiter{
client: client,
script: redis.NewScript(tokenBucketScript),
capacity: capacity,
rate: rate,
}
}

// Allow 检查是否允许请求通过
func (l *TokenBucketLimiter) Allow(ctx context.Context, key string, tokens int) (bool, error) {
now := float64(time.Now().Unix())

result, err := l.script.Run(ctx, l.client,
[]string{key},
l.capacity, l.rate, tokens, now,
).Result()

if err != nil {
return false, fmt.Errorf("failed to execute rate limit script: %w", err)
}

allowed, ok := result.(int64)
if !ok {
return false, fmt.Errorf("unexpected script result type: %T", result)
}

return allowed == 1, nil
}

// AllowN 批量消耗多个令牌
func (l *TokenBucketLimiter) AllowN(ctx context.Context, key string, n int) (bool, error) {
return l.Allow(ctx, key, n)
}

实际应用场景

1. API 接口限流

// API 网关中的使用示例
func (h *APIHandler) RateLimitMiddleware() gin.HandlerFunc {
limiter := NewTokenBucketLimiter(h.redis, 100, 10) // 桶容量100,每秒10个令牌

return func(c *gin.Context) {
userID := c.GetHeader("User-ID")
key := fmt.Sprintf("api_rate_limit:%s", userID)

allowed, err := limiter.Allow(c.Request.Context(), key, 1)
if err != nil {
c.JSON(500, gin.H{"error": "Rate limit check failed"})
c.Abort()
return
}

if !allowed {
c.JSON(429, gin.H{"error": "Rate limit exceeded"})
c.Abort()
return
}

c.Next()
}
}

使用场景特点:

  • 突发允许: 用户可以在短时间内快速消耗多个令牌
  • 平均限制: 长期来看保持稳定的请求速率
  • 用户隔离: 不同用户有独立的令牌桶

2. 消息队列消费限流

// 消息队列消费者限流
type MessageConsumer struct {
limiter *TokenBucketLimiter
}

func (c *MessageConsumer) ProcessMessage(msg Message) error {
// 不同优先级消息消耗不同数量的令牌
tokens := 1
if msg.Priority == "high" {
tokens = 2
}

allowed, err := c.limiter.Allow(context.Background(),
"message_consumer", tokens)
if err != nil {
return err
}

if !allowed {
// 将消息重新放入队列或延迟处理
return c.DelayMessage(msg)
}

return c.handleMessage(msg)
}

3. 分布式系统调用限流

// 服务间调用限流
func (s *ServiceClient) CallDownstream(ctx context.Context, req Request) (*Response, error) {
// 根据下游服务能力设置不同的限流参数
var limiter *TokenBucketLimiter
switch req.ServiceName {
case "payment":
limiter = s.paymentLimiter // 100 capacity, 50 rate
case "notification":
limiter = s.notificationLimiter // 50 capacity, 20 rate
}

key := fmt.Sprintf("downstream_call:%s", req.ServiceName)
allowed, err := limiter.Allow(ctx, key, 1)
if err != nil {
return nil, err
}

if !allowed {
return nil, errors.New("rate limit exceeded for downstream service")
}

return s.makeRequest(ctx, req)
}

性能优化与最佳实践

1. 连接池配置

// Redis 连接池优化
func NewRedisClient() *redis.Client {
return redis.NewClient(&redis.Options{
Addr: "localhost:6379",
PoolSize: 20, // 连接池大小
MinIdleConns: 5, // 最小空闲连接
MaxRetries: 3, // 重试次数
DialTimeout: 5 * time.Second,
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
})
}

2. 批量操作优化

当需要为多个资源检查限流时,可以使用 Pipeline 提升性能:

func (l *TokenBucketLimiter) AllowBatch(ctx context.Context, 
requests []RateLimitRequest) ([]bool, error) {

pipe := l.client.Pipeline()

// 批量执行脚本
var cmds []*redis.Cmd
for _, req := range requests {
cmd := l.script.Run(ctx, pipe,
[]string{req.Key},
l.capacity, l.rate, req.Tokens, float64(time.Now().Unix()),
)
cmds = append(cmds, cmd)
}

_, err := pipe.Exec(ctx)
if err != nil {
return nil, err
}

// 处理结果
results := make([]bool, len(requests))
for i, cmd := range cmds {
result, err := cmd.Result()
if err != nil {
return nil, err
}
results[i] = result.(int64) == 1
}

return results, nil
}

3. 错误处理与降级

func (l *TokenBucketLimiter) AllowWithFallback(ctx context.Context, 
key string, tokens int, fallbackAllow bool) bool {

allowed, err := l.Allow(ctx, key, tokens)
if err != nil {
// Redis 故障时的降级策略
log.Errorf("Rate limiter error: %v, using fallback: %v", err, fallbackAllow)
return fallbackAllow
}

return allowed
}

4. 监控与报警

// 限流指标收集
type RateLimiterMetrics struct {
AllowedCounter prometheus.Counter
DeniedCounter prometheus.Counter
ErrorCounter prometheus.Counter
LatencyHist prometheus.Histogram
}

func (l *TokenBucketLimiter) AllowWithMetrics(ctx context.Context,
key string, tokens int, metrics *RateLimiterMetrics) (bool, error) {

start := time.Now()
allowed, err := l.Allow(ctx, key, tokens)
metrics.LatencyHist.Observe(time.Since(start).Seconds())

if err != nil {
metrics.ErrorCounter.Inc()
return false, err
}

if allowed {
metrics.AllowedCounter.Inc()
} else {
metrics.DeniedCounter.Inc()
}

return allowed, nil
}

与其他限流算法对比

算法突发处理实现复杂度内存占用适用场景
令牌桶✅ 支持中等API限流、服务调用
漏桶❌ 不支持简单消息队列、数据处理
滑动窗口✅ 支持复杂精确统计场景
固定窗口⚠️ 部分支持简单简单限流场景

令牌桶算法在大多数业务场景中都能提供良好的平衡,既能处理突发流量,又能保证长期的限流效果,是分布式系统中最常用的限流算法之一。